{
  "cells": [
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "aC2QnhmKxpq1"
      },
      "source": [
        "**Please set up your credentials JSON as GCP_CREDENTIALS secrets**"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 2,
      "metadata": {
        "id": "UsUZobVduL7l"
      },
      "outputs": [],
      "source": [
        "import os\n",
        "from google.colab import userdata\n",
        "\n",
        "os.environ[\"DESTINATION__CREDENTIALS\"] = userdata.get('GCP_CREDENTIALS')\n",
        "os.environ[\"BUCKET_URL\"] = \"gs://your_bucket_url\""
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 1,
      "metadata": {
        "id": "mPBzsEgyjsBo"
      },
      "outputs": [],
      "source": [
        "# Install for production\n",
        "%%capture\n",
        "!pip install dlt[bigquery, gs]"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 1,
      "metadata": {
        "id": "evdUsDNbkCTk"
      },
      "outputs": [],
      "source": [
        "# Install for testing\n",
        "%%capture\n",
        "!pip install dlt[duckdb]"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": 2,
      "metadata": {
        "id": "lYh7r1mTf4uo"
      },
      "outputs": [],
      "source": [
        "import dlt\n",
        "import requests\n",
        "import pandas as pd\n",
        "from dlt.destinations import filesystem\n",
        "from io import BytesIO"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "76zT1PzAgs7A"
      },
      "source": [
        "Ingesting parquet files to GCS."
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "id": "xya0215jsnsb"
      },
      "outputs": [],
      "source": [
        "# Define a dlt source to download and process Parquet files as resources\n",
        "@dlt.source(name=\"rides\")\n",
        "def download_parquet():\n",
        "     for month in range(1,7):\n",
        "      file_name = f\"yellow_tripdata_2024-0{month}.parquet\"\n",
        "\n",
        "      url = f\"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-0{month}.parquet\"\n",
        "      response = requests.get(url)\n",
        "\n",
        "      df = pd.read_parquet(BytesIO(response.content))\n",
        "\n",
        "      # Return the dataframe as a dlt resource for ingestion\n",
        "      yield dlt.resource(df, name=file_name)\n",
        "\n",
        "# Initialize the pipeline\n",
        "pipeline = dlt.pipeline(\n",
        "    pipeline_name=\"rides_pipeline\",\n",
        "    destination=filesystem(\n",
        "      layout=\"{schema_name}/{table_name}.{ext}\"\n",
        "    ),\n",
        "    dataset_name=\"rides_dataset\"\n",
        ")\n",
        "\n",
        "# Run the pipeline to load Parquet data into DuckDB\n",
        "load_info = pipeline.run(\n",
        "    download_parquet(),\n",
        "    loader_file_format=\"parquet\"\n",
        "    )\n",
        "\n",
        "# Print the results\n",
        "print(load_info)\n"
      ]
    },
    {
      "cell_type": "markdown",
      "metadata": {
        "id": "S0310FT-gy_P"
      },
      "source": [
        "Ingesting data to Database"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "1_3K97w1c2v2",
        "outputId": "4b2d26bf-2814-46fa-f80d-7a2e17417a95"
      },
      "outputs": [],
      "source": [
        "# Define a dlt resource to download and process Parquet files as single table\n",
        "@dlt.resource(name=\"rides\", write_disposition=\"replace\")\n",
        "def download_parquet():\n",
        "     for month in range(1,7):\n",
        "      url = f\"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-0{month}.parquet\"\n",
        "      response = requests.get(url)\n",
        "\n",
        "      df = pd.read_parquet(BytesIO(response.content))\n",
        "\n",
        "      # Return the dataframe as a dlt resource for ingestion\n",
        "      yield df\n",
        "\n",
        "# Initialize the pipeline\n",
        "pipeline = dlt.pipeline(\n",
        "    pipeline_name=\"rides_pipeline\",\n",
        "    destination=\"duckdb\",  # Use DuckDB for testing\n",
        "    # destination=\"bigquery\",  # Use BigQuery for production\n",
        "    dataset_name=\"rides_dataset\"\n",
        ")\n",
        "\n",
        "# Run the pipeline to load Parquet data into DuckDB\n",
        "info = pipeline.run(download_parquet)\n",
        "\n",
        "# Print the results\n",
        "print(info)\n"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "gDcLjzLtooBV",
        "outputId": "74ff2de7-2f2e-41b9-a681-3dc5887f6eed"
      },
      "outputs": [],
      "source": [
        "import duckdb\n",
        "conn = duckdb.connect(f\"{pipeline.pipeline_name}.duckdb\")\n",
        "\n",
        "# Set search path to the dataset\n",
        "conn.sql(f\"SET search_path = '{pipeline.dataset_name}'\")\n",
        "\n",
        "# Describe the dataset to see loaded tables\n",
        "res = conn.sql(\"DESCRIBE\").df()\n",
        "print(res)"
      ]
    },
    {
      "cell_type": "code",
      "execution_count": null,
      "metadata": {
        "colab": {
          "base_uri": "https://localhost:8080/"
        },
        "id": "VVJy8JoerI2P",
        "outputId": "3f8c7fee-a9ee-4fd4-ec75-153ca60bd36f"
      },
      "outputs": [],
      "source": [
        "# provide a resource name to query a table of that name\n",
        "with pipeline.sql_client() as client:\n",
        "    with client.execute_query(f\"SELECT count(1) FROM rides\") as cursor:\n",
        "        data = cursor.df()\n",
        "print(data)"
      ]
    }
  ],
  "metadata": {
    "colab": {
      "provenance": []
    },
    "kernelspec": {
      "display_name": "Python 3",
      "name": "python3"
    },
    "language_info": {
      "name": "python"
    }
  },
  "nbformat": 4,
  "nbformat_minor": 0
}
